home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
ZSI
/
TC.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
44KB
|
1,556 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
from ZSI import _copyright, _children, _child_elements, _floattypes, _stringtypes, _seqtypes, _find_attr, _find_attrNS, _find_attrNodeNS, _find_arraytype, _find_default_namespace, _find_href, _find_encstyle, _resolve_prefix, _find_xsi_attr, _find_type, _find_xmlns_prefix, _get_element_nsuri_name, _get_idstr, _Node, EvaluateException, UNICODE_ENCODING, _valid_encoding, ParseException
from ZSI.wstools.Namespaces import SCHEMA, SOAP
from ZSI.wstools.Utility import SplitQName
from ZSI.wstools.c14n import Canonicalize
from ZSI.wstools.logging import getLogger as _GetLogger
import re
import types
import time
import copy
from base64 import decodestring as b64decode, encodestring as b64encode
from urllib import unquote as urldecode, quote as urlencode
from binascii import unhexlify as hexdecode, hexlify as hexencode
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
_is_xsd_or_soap_ns = lambda ns: ns in [
SCHEMA.XSD3,
SOAP.ENC,
SCHEMA.XSD1,
SCHEMA.XSD2]
_find_nil = lambda E: if not _find_xsi_attr(E, 'null'):
pass_find_xsi_attr(E, 'nil')
def _get_xsitype(pyclass):
if hasattr(pyclass, 'type') and type(pyclass.type) in _seqtypes:
return pyclass.type
elif hasattr(pyclass, 'type') and hasattr(pyclass, 'schema'):
return (pyclass.schema, pyclass.type)
return (None, None)
Nilled = None
UNBOUNDED = 'unbounded'
class TypeCode:
tag = None
type = (None, None)
typechecks = True
attribute_typecode_dict = None
logger = _GetLogger('ZSI.TC.TypeCode')
def __init__(self, pname = None, aname = None, minOccurs = 1, maxOccurs = 1, nillable = False, typed = True, unique = True, pyclass = None, attrs_aname = '_attrs', **kw):
if type(pname) in _seqtypes:
(self.nspname, self.pname) = pname
else:
self.nspname = None
self.pname = pname
if self.pname:
self.pname = str(self.pname).split(':')[-1]
if not aname:
pass
self.aname = self.pname
self.minOccurs = minOccurs
self.maxOccurs = maxOccurs
self.nillable = nillable
self.typed = typed
self.unique = unique
self.attrs_aname = attrs_aname
self.pyclass = pyclass
encoded = kw.get('encoded')
if encoded is not None:
self.nspname = kw['encoded']
def parse(self, elt, ps):
raise EvaluateException('Unimplemented evaluation', ps.Backtrace(elt))
def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
raise EvaluateException('Unimplemented evaluation', sw.Backtrace(elt))
def text_to_data(self, text, elt, ps):
raise EvaluateException('Unimplemented evaluation', ps.Backtrace(elt))
def serialize_as_nil(self, elt):
elt.setAttributeNS(SCHEMA.XSI3, 'nil', '1')
def SimpleHREF(self, elt, ps, tag):
if len(_children(elt)):
return elt
href = _find_href(elt)
if not href:
if self.minOccurs is 0:
return None
raise EvaluateException('Required' + tag + ' missing', ps.Backtrace(elt))
return ps.FindLocalHREF(href, elt, 0)
def get_parse_and_errorlist(self):
d = self.__class__.__dict__
parselist = d.get('parselist')
errorlist = d.get('errorlist')
if parselist and not errorlist:
errorlist = []
for t in parselist:
if t[1] not in errorlist:
errorlist.append(t[1])
continue
errorlist = ' or '.join(errorlist)
d['errorlist'] = errorlist
return (parselist, errorlist)
def checkname(self, elt, ps):
(parselist, errorlist) = self.get_parse_and_errorlist()
(ns, name) = _get_element_nsuri_name(elt)
if ns == SOAP.ENC:
if parselist and (None, name) not in parselist and (ns, name) not in parselist:
raise EvaluateException('Element mismatch (got %s wanted %s) (SOAP encoding namespace)' % (name, errorlist), ps.Backtrace(elt))
return (ns, name)
if self.nspname and ns != self.nspname:
raise EvaluateException('Element NS mismatch (got %s wanted %s)' % (ns, self.nspname), ps.Backtrace(elt))
if self.pname and name != self.pname:
raise EvaluateException('Element Name mismatch (got %s wanted %s)' % (name, self.pname), ps.Backtrace(elt))
return self.checktype(elt, ps)
def checktype(self, elt, ps):
typeName = _find_type(elt)
if typeName is None or typeName == '':
return (None, None)
(prefix, typeName) = SplitQName(typeName)
uri = ps.GetElementNSdict(elt).get(prefix)
if uri is None:
uri = self.nspname
(parselist, errorlist) = self.get_parse_and_errorlist()
if (not parselist and (uri, typeName) in parselist or _is_xsd_or_soap_ns(uri)) and (None, typeName) in parselist:
return (uri, typeName)
raise EvaluateException('Type mismatch (%s namespace) (got %s wanted %s)' % (uri, typeName, errorlist), ps.Backtrace(elt))
def name_match(self, elt):
if self.pname == elt.localName:
pass
return self.nspname in [
None,
'',
elt.namespaceURI]
def nilled(self, elt, ps):
if _find_nil(elt) not in ('true', '1'):
return False
if self.nillable is False:
raise EvaluateException('Non-nillable element is NIL', ps.Backtrace(elt))
return True
def simple_value(self, elt, ps, mixed = False):
if not _valid_encoding(elt):
raise EvaluateException('Invalid encoding', ps.Backtrace(elt))
c = _children(elt)
if mixed is False:
if len(c) == 0:
raise EvaluateException('Value missing', ps.Backtrace(elt))
for c_elt in c:
if c_elt.nodeType == _Node.ELEMENT_NODE:
raise EvaluateException('Sub-elements in value', ps.Backtrace(c_elt))
continue
return [](_[1])
def parse_attributes(self, elt, ps):
if self.attribute_typecode_dict is None:
return None
attributes = { }
for attr, what in self.attribute_typecode_dict.items():
namespaceURI = None
localName = attr
if type(attr) in _seqtypes:
(namespaceURI, localName) = attr
value = _find_attrNodeNS(elt, namespaceURI, localName)
self.logger.debug('Parsed Attribute (%s,%s) -- %s', namespaceURI, localName, value)
if value is None:
continue
attributes[attr] = what.text_to_data(value, elt, ps)
return attributes
def set_attributes(self, el, pyobj):
if not hasattr(pyobj, self.attrs_aname):
return None
if not isinstance(getattr(pyobj, self.attrs_aname), dict):
raise TypeError, 'pyobj.%s must be a dictionary of names and values' % self.attrs_aname
for attr, value in getattr(pyobj, self.attrs_aname).items():
namespaceURI = None
localName = attr
if type(attr) in _seqtypes:
(namespaceURI, localName) = attr
what = None
if getattr(self, 'attribute_typecode_dict', None) is not None:
what = self.attribute_typecode_dict.get(attr)
if what is None and namespaceURI is None:
what = self.attribute_typecode_dict.get(localName)
if hasattr(value, 'typecode') and not isinstance(what, AnyType):
if what is not None and not isinstance(value.typecode, what):
raise EvaluateException, 'self-describing attribute must subclass %s' % what.__class__
what = value.typecode
self.logger.debug('attribute create -- %s', value)
if isinstance(what, QName):
what.set_prefix(el, value)
if what is None:
value = str(value)
else:
value = what.get_formatted_content(value)
el.setAttributeNS(namespaceURI, localName, value)
def set_attribute_xsi_type(self, el, **kw):
if kw.get('typed', self.typed):
(namespaceURI, typeName) = kw.get('type', _get_xsitype(self))
if namespaceURI and typeName:
self.logger.debug('attribute: (%s, %s)', namespaceURI, typeName)
el.setAttributeType(namespaceURI, typeName)
def set_attribute_href(self, el, objid):
el.setAttributeNS(None, 'href', '#%s' % objid)
def set_attribute_id(self, el, objid):
if self.unique is False:
el.setAttributeNS(None, 'id', '%s' % objid)
def get_name(self, name, objid):
if type(name) is tuple:
return name
ns = self.nspname
if not name and self.pname:
pass
n = 'E' + objid
return (ns, n)
def has_attributes(self):
if self.attribute_typecode_dict is None:
return False
return len(self.attribute_typecode_dict) > 0
class SimpleType(TypeCode):
empty_content = None
logger = _GetLogger('ZSI.TC.SimpleType')
def parse(self, elt, ps):
self.checkname(elt, ps)
if len(_children(elt)) == 0:
href = _find_href(elt)
if not href:
if self.nilled(elt, ps) is False:
return self.text_to_data(self.empty_content, elt, ps)
if self.nillable is True:
return Nilled
raise EvaluateException('Requiredstring missing', ps.Backtrace(elt))
if href[0] != '#':
return ps.ResolveHREF(href, self)
elt = ps.FindLocalHREF(href, elt)
self.checktype(elt, ps)
if self.nilled(elt, ps):
return Nilled
if len(_children(elt)) == 0:
v = self.empty_content
else:
v = self.simple_value(elt, ps)
else:
v = self.simple_value(elt, ps)
pyobj = self.text_to_data(v, elt, ps)
if self.attribute_typecode_dict is not None:
attributes = self.parse_attributes(elt, ps)
if attributes:
setattr(pyobj, self.attrs_aname, attributes)
return pyobj
def get_formatted_content(self, pyobj):
raise NotImplementedError, 'method get_formatted_content is not implemented'
def serialize_text_node(self, elt, sw, pyobj):
textNode = None
if pyobj is not None:
text = self.get_formatted_content(pyobj)
if type(text) not in _stringtypes:
raise TypeError, 'pyobj must be a formatted string'
textNode = elt.createAppendTextNode(text)
return textNode
def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
objid = _get_idstr(pyobj)
(ns, n) = self.get_name(name, objid)
el = elt.createAppendElement(ns, n)
if self.nillable is True and pyobj is Nilled:
self.serialize_as_nil(el)
return None
self.set_attributes(el, pyobj)
if not self.unique:
pass
unique = kw.get('unique', False)
if unique is False:
if not orig:
pass
if sw.Known(pyobj):
self.set_attribute_href(el, objid)
return None
if kw.get('typed', self.typed) is True:
self.set_attribute_xsi_type(el, **kw)
if self.unique is False:
self.set_attribute_id(el, objid)
self.serialize_text_node(el, sw, pyobj)
return el
class Any(TypeCode):
logger = _GetLogger('ZSI.TC.Any')
parsemap = { }
serialmap = { }
def __init__(self, pname = None, aslist = False, minOccurs = 0, unique = False, **kw):
TypeCode.__init__(self, pname, minOccurs = minOccurs, unique = unique, **kw)
self.aslist = aslist
self.kwargs = dict(aslist = aslist, unique = unique)
self.kwargs.update(kw)
def listify(self, v):
return dict(v)
def parse_into_dict_or_list(self, elt, ps):
c = _child_elements(elt)
count = len(c)
v = []
if count == 0:
href = _find_href(elt)
if not href:
return v
elt = ps.FindLocalHREF(href, elt)
self.checktype(elt, ps)
c = _child_elements(elt)
count = len(c)
if count == 0:
return self.listify(v)
if self.nilled(elt, ps):
return Nilled
for c_elt in c:
v.append((str(c_elt.localName), self.__class__(**self.kwargs).parse(c_elt, ps)))
return self.listify(v)
def parse(self, elt, ps):
(ns, type) = self.checkname(elt, ps)
if not type and self.nilled(elt, ps):
return Nilled
if len(_children(elt)) == 0:
href = _find_href(elt)
if not href:
if self.minOccurs < 1:
if _is_xsd_or_soap_ns(ns):
parser = Any.parsemap.get((None, type))
if parser:
return parser.parse(elt, ps)
if not (ns, type) == (SOAP.ENC, 'Array'):
if not _find_arraytype(elt):
pass
if ''.endswith('[0]'):
return []
return None
raise EvaluateException('Required Any missing', ps.Backtrace(elt))
elt = ps.FindLocalHREF(href, elt)
(ns, type) = self.checktype(elt, ps)
if not type and elt.namespaceURI == SOAP.ENC:
ns = SOAP.ENC
type = elt.localName
if not type or (ns, type) == (SOAP.ENC, 'Array'):
if len(_child_elements(elt)) == 0:
return self.simple_value(elt, ps)
return self.parse_into_dict_or_list(elt, ps)
parser = Any.parsemap.get((ns, type))
if not parser and _is_xsd_or_soap_ns(ns):
parser = Any.parsemap.get((None, type))
if not parser:
raise EvaluateException("Any can't parse element", ps.Backtrace(elt))
return parser.parse(elt, ps)
def get_formatted_content(self, pyobj):
tc = type(pyobj)
if tc == types.InstanceType:
tc = pyobj.__class__
if hasattr(pyobj, 'typecode'):
serializer = pyobj.typecode
else:
serializer = Any.serialmap.get(tc)
if not serializer:
tc = (types.ClassType, pyobj.__class__.__name__)
serializer = Any.serialmap.get(tc)
else:
serializer = Any.serialmap.get(tc)
if not serializer and isinstance(pyobj, time.struct_time):
gDateTime = gDateTime
import ZSI.TCtimes
serializer = gDateTime()
if serializer:
return serializer.get_formatted_content(pyobj)
raise EvaluateException, 'Failed to find serializer for pyobj %s' % pyobj
def serialize(self, elt, sw, pyobj, name = None, **kw):
if hasattr(pyobj, 'typecode') and pyobj.typecode is not self:
pyobj.typecode.serialize(elt, sw, pyobj, **kw)
return None
objid = _get_idstr(pyobj)
(ns, n) = self.get_name(name, objid)
kw.setdefault('typed', self.typed)
tc = type(pyobj)
self.logger.debug('Any serialize -- %s', tc)
if tc in _seqtypes:
if self.aslist:
array = elt.createAppendElement(ns, n)
array.setAttributeType(SOAP.ENC, 'Array')
array.setAttributeNS(self.nspname, 'SOAP-ENC:arrayType', 'xsd:anyType[' + str(len(pyobj)) + ']')
for o in pyobj:
serializer = getattr(o, 'typecode', Any(**self.kwargs))
serializer.serialize(array, sw, o, name = 'element', **kw)
else:
struct = elt.createAppendElement(ns, n)
for o in pyobj:
serializer = getattr(o, 'typecode', Any(**self.kwargs))
serializer.serialize(struct, sw, o, **kw)
return None
kw['name'] = (ns, n)
if tc == types.DictType:
el = elt.createAppendElement(ns, n)
parentNspname = self.nspname
self.nspname = None
for o, m in pyobj.items():
if type(o) != types.StringType and type(o) != types.UnicodeType:
raise Exception, 'Dictionary implementation requires keys to be of type string (or unicode).' % pyobj
kw['name'] = o
kw.setdefault('typed', True)
self.serialize(el, sw, m, **kw)
self.nspname = parentNspname
return None
if tc == types.InstanceType:
tc = pyobj.__class__
if hasattr(pyobj, 'typecode'):
serializer = pyobj.typecode
else:
serializer = Any.serialmap.get(tc)
if not serializer:
tc = (types.ClassType, pyobj.__class__.__name__)
serializer = Any.serialmap.get(tc)
else:
serializer = Any.serialmap.get(tc)
if not serializer and isinstance(pyobj, time.struct_time):
gDateTime = gDateTime
import ZSI.TCtimes
serializer = gDateTime()
if not serializer:
if pyobj is None:
self.serialize_as_nil(elt.createAppendElement(ns, n))
elif type(pyobj) != types.InstanceType:
raise EvaluateException("Any can't serialize " + repr(pyobj))
else:
self.serialize(elt, sw, pyobj.__dict__, **kw)
else:
tag = getattr(serializer, 'tag', None)
if self.pname is not None:
if 'typed' not in kw:
kw['typed'] = False
elif tag:
if tag.find(':') == -1:
tag = 'SOAP-ENC:' + tag
kw['name'] = tag
kw['typed'] = False
serializer.unique = self.unique
serializer.serialize(elt, sw, pyobj, **kw)
class String(SimpleType):
empty_content = ''
parselist = [
(None, 'string')]
seriallist = [
types.StringType,
types.UnicodeType]
type = (SCHEMA.XSD3, 'string')
logger = _GetLogger('ZSI.TC.String')
def __init__(self, pname = None, strip = True, **kw):
TypeCode.__init__(self, pname, **kw)
if kw.has_key('resolver'):
self.resolver = kw['resolver']
self.strip = strip
def text_to_data(self, text, elt, ps):
if self.strip:
text = text.strip()
if self.pyclass is not None:
return self.pyclass(text.encode(UNICODE_ENCODING))
return text.encode(UNICODE_ENCODING)
def get_formatted_content(self, pyobj):
if type(pyobj) not in _stringtypes:
pyobj = str(pyobj)
if type(pyobj) == unicode:
return pyobj.encode(UNICODE_ENCODING)
return pyobj
class URI(String):
parselist = [
(None, 'anyURI'),
(SCHEMA.XSD3, 'anyURI')]
type = (SCHEMA.XSD3, 'anyURI')
logger = _GetLogger('ZSI.TC.URI')
reserved = ';/?:@&=+$,'
def text_to_data(self, text, elt, ps):
return String.text_to_data(self, urldecode(text), elt, ps)
def get_formatted_content(self, pyobj):
u = urlencode(pyobj, self.reserved)
return String.get_formatted_content(self, u)
class QName(String):
parselist = [
(None, 'QName')]
type = (SCHEMA.XSD3, 'QName')
logger = _GetLogger('ZSI.TC.QName')
def __init__(self, pname = None, strip = 1, **kw):
String.__init__(self, pname, strip, **kw)
self.prefix = None
def get_formatted_content(self, pyobj):
value = pyobj
if isinstance(pyobj, tuple):
(namespaceURI, localName) = pyobj
if self.prefix is not None:
value = '%s:%s' % (self.prefix, localName)
return String.get_formatted_content(self, value)
def set_prefix(self, elt, pyobj):
if isinstance(pyobj, tuple):
(namespaceURI, localName) = pyobj
self.prefix = elt.getPrefix(namespaceURI)
def text_to_data(self, text, elt, ps):
(prefix, localName) = SplitQName(text)
nsdict = ps.GetElementNSdict(elt)
if not prefix:
pass
prefix = ''
try:
namespaceURI = nsdict[prefix]
except KeyError:
ex = None
raise EvaluateException('cannot resolve prefix(%s)' % prefix, ps.Backtrace(elt))
v = (namespaceURI, localName)
if self.pyclass is not None:
return self.pyclass(v)
return v
def serialize_text_node(self, elt, sw, pyobj):
self.set_prefix(elt, pyobj)
return String.serialize_text_node(self, elt, sw, pyobj)
class Token(String):
parselist = [
(None, 'token')]
type = (SCHEMA.XSD3, 'token')
logger = _GetLogger('ZSI.TC.Token')
class Base64String(String):
parselist = [
(None, 'base64Binary'),
(SOAP.ENC, 'base64')]
type = (SOAP.ENC, 'base64')
logger = _GetLogger('ZSI.TC.Base64String')
def text_to_data(self, text, elt, ps):
val = b64decode(text.replace(' ', '').replace('\n', '').replace('\r', ''))
if self.pyclass is not None:
return self.pyclass(val)
return val
def get_formatted_content(self, pyobj):
pyobj = '\n' + b64encode(pyobj)
return String.get_formatted_content(self, pyobj)
class Base64Binary(String):
parselist = [
(None, 'base64Binary')]
type = (SCHEMA.XSD3, 'base64Binary')
logger = _GetLogger('ZSI.TC.Base64Binary')
def text_to_data(self, text, elt, ps):
val = b64decode(text)
if self.pyclass is not None:
return self.pyclass(val)
return val
def get_formatted_content(self, pyobj):
pyobj = b64encode(pyobj).strip()
return pyobj
class HexBinaryString(String):
parselist = [
(None, 'hexBinary')]
type = (SCHEMA.XSD3, 'hexBinary')
logger = _GetLogger('ZSI.TC.HexBinaryString')
def text_to_data(self, text, elt, ps):
val = hexdecode(text)
if self.pyclass is not None:
return self.pyclass(val)
return val
def get_formatted_content(self, pyobj):
pyobj = hexencode(pyobj).upper()
return String.get_formatted_content(self, pyobj)
class XMLString(String):
logger = _GetLogger('ZSI.TC.XMLString')
def __init__(self, pname = None, readerclass = None, **kw):
String.__init__(self, pname, **kw)
self.readerclass = readerclass
def parse(self, elt, ps):
if not self.readerclass:
PyExpat = PyExpat
import xml.dom.ext.reader
self.readerclass = PyExpat.Reader
v = String.parse(self, elt, ps)
return self.readerclass().fromString(v)
def get_formatted_content(self, pyobj):
return String.get_formatted_content(self, pyobj)
class Enumeration(String):
logger = _GetLogger('ZSI.TC.Enumeration')
def __init__(self, choices, pname = None, **kw):
String.__init__(self, pname, **kw)
t = type(choices)
if t in _seqtypes:
self.choices = tuple(choices)
elif TypeCode.typechecks:
raise TypeError('Enumeration choices must be list or sequence, not ' + str(t))
if TypeCode.typechecks:
for c in self.choices:
if type(c) not in _stringtypes:
raise TypeError('Enumeration choice ' + str(c) + ' is not a string')
continue
def parse(self, elt, ps):
val = String.parse(self, elt, ps)
if val not in self.choices:
raise EvaluateException('Value not in enumeration list', ps.Backtrace(elt))
return val
def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
if pyobj not in self.choices:
raise EvaluateException('Value not in enumeration list', sw.Backtrace(elt))
String.serialize(self, elt, sw, pyobj, name = name, orig = orig, **kw)
_ignored = []
class Integer(SimpleType):
ranges = {
'unsignedByte': (0, 255),
'unsignedShort': (0, 65535),
'unsignedInt': (0, 0xFFFFFFFFL),
'unsignedLong': (0, 0xFFFFFFFFFFFFFFFFL),
'byte': (-128, 127),
'short': (-32768, 32767),
'int': (-0x80000000L, 2147483647),
'long': (-0x8000000000000000L, 0x7FFFFFFFFFFFFFFFL),
'negativeInteger': (_ignored, -1),
'nonPositiveInteger': (_ignored, 0),
'nonNegativeInteger': (0, _ignored),
'positiveInteger': (1, _ignored),
'integer': (_ignored, _ignored) }
parselist = [ (None, k) for k in ranges.keys() ]
seriallist = [
types.IntType,
types.LongType]
logger = _GetLogger('ZSI.TC.Integer')
def __init__(self, pname = None, format = '%d', **kw):
TypeCode.__init__(self, pname, **kw)
self.format = format
def text_to_data(self, text, elt, ps):
if self.pyclass is not None:
v = self.pyclass(text)
else:
try:
v = int(text)
except:
try:
v = long(text)
raise EvaluateException('Unparseable integer', ps.Backtrace(elt))
return v
def parse(self, elt, ps):
(ns, type) = self.checkname(elt, ps)
if self.nilled(elt, ps):
return Nilled
elt = self.SimpleHREF(elt, ps, 'integer')
if not elt:
return None
if type is None:
type = self.type[1]
elif self.type[1] is not None and type != self.type[1]:
raise EvaluateException('Integer type mismatch; got %s wanted %s' % (type, self.type[1]), ps.Backtrace(elt))
v = self.simple_value(elt, ps)
v = self.text_to_data(v, elt, ps)
(rmin, rmax) = Integer.ranges.get(type, (_ignored, _ignored))
if rmin != _ignored and v < rmin:
raise EvaluateException('Underflow, less than ' + repr(rmin), ps.Backtrace(elt))
if rmax != _ignored and v > rmax:
raise EvaluateException('Overflow, greater than ' + repr(rmax), ps.Backtrace(elt))
return v
def get_formatted_content(self, pyobj):
return self.format % pyobj
def _make_inf():
x = 2
x2 = x * x
i = 0
while i < 100 and x != x2:
x = x2
x2 = x * x
i = i + 1
if x != x2:
raise ValueError("This machine's floats go on forever!")
return x
_magicnums = { }
try:
_magicnums['INF'] = float('INF')
_magicnums['-INF'] = float('-INF')
except:
_magicnums['INF'] = _make_inf()
_magicnums['-INF'] = -_magicnums['INF']
def isnan(x):
if x * 1 < x:
return 1
if 1 == x:
pass
return x == 2
class Decimal(SimpleType):
parselist = [
(None, 'decimal'),
(None, 'float'),
(None, 'double')]
seriallist = _floattypes
type = None
ranges = {
'float': (7.00649e-46, -3.40282e+38, 3.40282e+38),
'double': (0, -1.79769e+308, 1.79769e+308) }
zeropat = re.compile('[1-9]')
logger = _GetLogger('ZSI.TC.Decimal')
def __init__(self, pname = None, format = '%f', **kw):
TypeCode.__init__(self, pname, **kw)
self.format = format
def text_to_data(self, text, elt, ps):
v = text
if self.pyclass is not None:
return self.pyclass(v)
m = _magicnums.get(v)
if m:
return m
try:
return float(v)
except:
raise EvaluateException('Unparseable floating point number', ps.Backtrace(elt))
def parse(self, elt, ps):
(ns, type) = self.checkname(elt, ps)
elt = self.SimpleHREF(elt, ps, 'floating-point')
if not elt:
return None
tag = getattr(self.__class__, 'type')
if tag:
if type is None:
type = tag
elif tag != (ns, type):
raise EvaluateException('Floating point type mismatch; got (%s,%s) wanted %s' % (ns, type, tag), ps.Backtrace(elt))
if self.nilled(elt, ps):
return Nilled
v = self.simple_value(elt, ps)
try:
fp = self.text_to_data(v, elt, ps)
except EvaluateException:
ex = None
ex.args.append(ps.Backtrace(elt))
raise ex
m = _magicnums.get(v)
if m:
return m
if str(fp).lower() in ('inf', '-inf', 'nan', '-nan'):
raise EvaluateException('Floating point number parsed as "' + str(fp) + '"', ps.Backtrace(elt))
if fp == 0 and Decimal.zeropat.search(v):
raise EvaluateException('Floating point number parsed as zero', ps.Backtrace(elt))
(rtiny, rneg, rpos) = Decimal.ranges.get(type, (None, None, None))
if rneg and fp < 0 and fp < rneg:
raise EvaluateException('Negative underflow', ps.Backtrace(elt))
if rtiny and fp > 0 and fp < rtiny:
raise EvaluateException('Positive underflow', ps.Backtrace(elt))
if rpos and fp > 0 and fp > rpos:
raise EvaluateException('Overflow', ps.Backtrace(elt))
return fp
def get_formatted_content(self, pyobj):
if pyobj == _magicnums['INF']:
return 'INF'
elif pyobj == _magicnums['-INF']:
return '-INF'
elif isnan(pyobj):
return 'NaN'
else:
return self.format % pyobj
class Boolean(SimpleType):
parselist = [
(None, 'boolean')]
seriallist = [
bool]
type = (SCHEMA.XSD3, 'boolean')
logger = _GetLogger('ZSI.TC.Boolean')
def text_to_data(self, text, elt, ps):
v = text
if v == 'false':
if self.pyclass is None:
return False
return self.pyclass(False)
if v == 'true':
if self.pyclass is None:
return True
return self.pyclass(True)
try:
v = int(v)
except:
try:
v = long(v)
raise EvaluateException('Unparseable boolean', ps.Backtrace(elt))
if v:
if self.pyclass is None:
return True
return self.pyclass(True)
if self.pyclass is None:
return False
return self.pyclass(False)
def parse(self, elt, ps):
self.checkname(elt, ps)
elt = self.SimpleHREF(elt, ps, 'boolean')
if not elt:
return None
if self.nilled(elt, ps):
return Nilled
v = self.simple_value(elt, ps).lower()
return self.text_to_data(v, elt, ps)
def get_formatted_content(self, pyobj):
if pyobj:
return 'true'
return 'false'
class XML(TypeCode):
copyit = 0
logger = _GetLogger('ZSI.TC.XML')
def __init__(self, pname = None, comments = 0, inline = 0, wrapped = True, **kw):
TypeCode.__init__(self, pname, **kw)
self.comments = comments
self.inline = inline
if kw.has_key('resolver'):
self.resolver = kw['resolver']
self.wrapped = wrapped
self.copyit = kw.get('copyit', XML.copyit)
def parse(self, elt, ps):
if self.wrapped is False:
return elt
c = _child_elements(elt)
if not c:
href = _find_href(elt)
if not href:
if self.minOccurs == 0:
return None
raise EvaluateException('Embedded XML document missing', ps.Backtrace(elt))
if href[0] != '#':
return ps.ResolveHREF(href, self)
elt = ps.FindLocalHREF(href, elt)
c = _child_elements(elt)
if _find_encstyle(elt) != '':
pass
if len(c) != 1:
raise EvaluateException('Embedded XML has more than one child', ps.Backtrace(elt))
if self.copyit:
return c[0].cloneNode(1)
return c[0]
def serialize(self, elt, sw, pyobj, name = None, unsuppressedPrefixes = [], **kw):
objid = _get_idstr(pyobj)
(ns, n) = self.get_name(name, objid)
xmlelt = elt
if self.wrapped:
xmlelt = elt.createAppendElement(ns, n)
self.cb(xmlelt, sw, pyobj, unsuppressedPrefixes)
def cb(self, elt, sw, pyobj, unsuppressedPrefixes = []):
if type(pyobj) in _stringtypes:
elt.createAppendTextNode(pyobj)
return None
doc = elt.getDocument()
node = doc.importNode(pyobj, deep = 1)
child = elt.node.appendChild(node)
parent = pyobj.parentNode
while parent.nodeType == _Node.ELEMENT_NODE:
for attr in (filter,)((lambda a: if a.name.startswith('xmlns:'):
passa.name not in child.attributes.keys()), parent.attributes):
child.setAttributeNode(attr.cloneNode(1))
parent = parent.parentNode
class AnyType(TypeCode):
all = '#all'
other = '#other'
type = (SCHEMA.XSD3, 'anyType')
logger = _GetLogger('ZSI.TC.AnyType')
def __init__(self, pname = None, namespaces = [
'#all'], minOccurs = 1, maxOccurs = 1, strip = 1, **kw):
TypeCode.__init__(self, pname = pname, minOccurs = minOccurs, maxOccurs = maxOccurs, **kw)
self.namespaces = namespaces
def get_formatted_content(self, pyobj):
what = getattr(pyobj, 'typecode', Any())
return what.get_formatted_content(pyobj)
def text_to_data(self, text, elt, ps):
return text
def serialize(self, elt, sw, pyobj, **kw):
(nsuri, typeName) = _get_xsitype(pyobj)
if self.all not in self.namespaces and nsuri not in self.namespaces:
raise EvaluateException('<anyType> unsupported use of namespaces "%s"' % self.namespaces)
what = getattr(pyobj, 'typecode', None)
if what is None:
what = Any(pname = (self.nspname, self.pname), unique = True, aslist = False)
kw['typed'] = True
what.serialize(elt, sw, pyobj, **kw)
return None
None(what.serialize, elt, sw, pyobj = ('name', what.nspname if not self.nspname else what.pname), **kw)
def parse(self, elt, ps):
(nspname, pname) = _get_element_nsuri_name(elt)
if nspname != self.nspname or pname != self.pname:
raise EvaluateException('<anyType> instance is (%s,%s) found (%s,%s)' % (self.nspname, self.pname, nspname, pname), ps.Backtrace(elt))
(prefix, typeName) = SplitQName(_find_type(elt))
namespaceURI = _resolve_prefix(elt, prefix)
pyclass = GTD(namespaceURI, typeName)
if not pyclass:
if _is_xsd_or_soap_ns(namespaceURI):
pyclass = Any
elif str(namespaceURI).lower() == str(Apache.Map.type[0]).lower() and str(typeName).lower() == str(Apache.Map.type[1]).lower():
pyclass = Apache.Map
else:
pyobj = Any().parse_into_dict_or_list(elt, ps)
return pyobj
what = pyclass(pname = (self.nspname, self.pname))
pyobj = what.parse(elt, ps)
return pyobj
class AnyElement(AnyType):
tag = (SCHEMA.XSD3, 'any')
logger = _GetLogger('ZSI.TC.AnyElement')
def __init__(self, namespaces = [
'#all'], pname = None, minOccurs = 1, maxOccurs = 1, strip = 1, processContents = 'strict', **kw):
if processContents not in ('lax', 'skip', 'strict'):
raise ValueError('processContents(%s) must be lax, skip, or strict')
self.processContents = processContents
AnyType.__init__(self, namespaces = namespaces, pname = pname, minOccurs = minOccurs, maxOccurs = maxOccurs, strip = strip, **kw)
def serialize(self, elt, sw, pyobj, **kw):
if isinstance(pyobj, TypeCode):
raise TypeError, 'pyobj is a typecode instance.'
what = getattr(pyobj, 'typecode', None)
if what is not None and type(pyobj) is types.InstanceType:
tc = pyobj.__class__
what = Any.serialmap.get(tc)
if not what:
tc = (types.ClassType, pyobj.__class__.__name__)
what = Any.serialmap.get(tc)
self.logger.debug('processContents: %s', self.processContents)
if what is None:
what = Any(pname = (self.nspname, self.pname))
self.logger.debug('serialize with %s', what.__class__.__name__)
what.serialize(elt, sw, pyobj, **kw)
def parse(self, elt, ps):
skip = self.processContents == 'skip'
(nspname, pname) = _get_element_nsuri_name(elt)
what = GED(nspname, pname)
if not skip and what is not None:
pyobj = what.parse(elt, ps)
try:
pyobj.typecode = what
except AttributeError:
ex = None
pyobj = WrapImmutable(pyobj, what)
return pyobj
(prefix, typeName) = SplitQName(_find_type(elt))
if not skip and typeName:
if not prefix:
pass
namespaceURI = _resolve_prefix(elt, 'xmlns')
if not GTD(namespaceURI, typeName):
pass
pyclass = Any
what = pyclass(pname = (nspname, pname))
pyobj = what.parse(elt, ps)
try:
pyobj.typecode = what
except AttributeError:
ex = None
pyobj = WrapImmutable(pyobj, what)
what.typed = True
return pyobj
if skip:
what = XML(pname = (nspname, pname), wrapped = False)
elif self.processContents == 'lax':
what = Any(pname = (nspname, pname), unique = True)
else:
what = Any(pname = (nspname, pname), unique = True)
try:
pyobj = what.parse(elt, ps)
except EvaluateException:
ex = None
self.logger.debug('error parsing: %s' % str(ex))
if len(_children(elt)) != 0:
self.logger.debug('parse <any>, return as dict')
return Any(aslist = False).parse_into_dict_or_list(elt, ps)
self.logger.debug('Give up, parse (%s,%s) as a String', what.nspname, what.pname)
what = String(pname = (nspname, pname), typed = False)
return WrapImmutable(what.parse(elt, ps), what)
if pyobj is None:
return None
if type(pyobj) is dict:
return pyobj
try:
pyobj.typecode = what
except AttributeError:
pyobj = WrapImmutable(pyobj, what)
return pyobj
class Union(SimpleType):
memberTypes = None
logger = _GetLogger('ZSI.TC.Union')
def __init__(self, pname = None, minOccurs = 1, maxOccurs = 1, **kw):
SimpleType.__init__(self, pname = pname, minOccurs = minOccurs, maxOccurs = maxOccurs, **kw)
self.memberTypeCodes = []
def setMemberTypeCodes(self):
if len(self.memberTypeCodes) > 0:
return None
if self.__class__.memberTypes is None:
raise EvaluateException, 'uninitialized class variable memberTypes [(namespace,name),]'
for nsuri, name in self.__class__.memberTypes:
tcclass = GTD(nsuri, name)
if tcclass is None:
if not Any.parsemap.get((nsuri, name)):
pass
tc = Any.parsemap.get((None, name))
typecode = tc.__class__(pname = (self.nspname, self.pname))
else:
typecode = tcclass(pname = (self.nspname, self.pname))
if typecode is None:
raise EvaluateException, 'Typecode class for Union memberType (%s,%s) is missing' % (nsuri, name)
if isinstance(typecode, Struct):
raise EvaluateException, 'Illegal: Union memberType (%s,%s) is complexType' % (nsuri, name)
self.memberTypeCodes.append(typecode)
def parse(self, elt, ps, **kw):
self.setMemberTypeCodes()
(nsuri, typeName) = self.checkname(elt, ps)
for indx in range(len(self.memberTypeCodes)):
typecode = self.memberTypeCodes[indx]
try:
pyobj = typecode.parse(elt, ps)
except ParseException:
ex = None
continue
except Exception:
ex = None
continue
if indx > 0:
self.memberTypeCodes.remove(typecode)
self.memberTypeCodes.insert(0, typecode)
break
else:
raise
return pyobj
def get_formatted_content(self, pyobj, **kw):
self.setMemberTypeCodes()
for indx in range(len(self.memberTypeCodes)):
typecode = self.memberTypeCodes[indx]
try:
content = typecode.get_formatted_content(copy.copy(pyobj))
except (ParseException, TypeError):
pass
if indx > 0:
self.memberTypeCodes.remove(typecode)
self.memberTypeCodes.insert(0, typecode)
continue
else:
raise
return content
class List(SimpleType):
itemType = None
logger = _GetLogger('ZSI.TC.List')
def __init__(self, pname = None, itemType = None, **kw):
SimpleType.__init__(self, pname = pname, **kw)
if not itemType:
pass
self.itemType = self.itemType
self.itemTypeCode = self.itemType
itemTypeCode = None
if type(self.itemTypeCode) in _seqtypes:
(namespaceURI, name) = self.itemTypeCode
try:
itemTypeCode = GTD(*self.itemType)(None)
except:
if _is_xsd_or_soap_ns(namespaceURI) is False:
raise
for pyclass in TYPES:
if pyclass.type == self.itemTypeCode:
itemTypeCode = pyclass(None)
break
continue
if pyclass.type[1] == name:
itemTypeCode = pyclass(None)
continue
if itemTypeCode is None:
raise EvaluateException('Failed to locate %s' % str(self.itemTypeCode))
if hasattr(itemTypeCode, 'text_to_data') is False:
raise EvaluateException('TypeCode class %s missing text_to_data method' % itemTypeCode)
self.itemTypeCode = itemTypeCode
def text_to_data(self, text, elt, ps):
v = []
items = text.split()
for item in items:
v.append(self.itemTypeCode.text_to_data(item, elt, ps))
if self.pyclass is not None:
return self.pyclass(v)
return v
def parse(self, elt, ps):
self.checkname(elt, ps)
if len(_children(elt)) == 0:
href = _find_href(elt)
if not href:
if self.nilled(elt, ps) is False:
return []
if self.nillable is True:
return Nilled
raise EvaluateException('Required string missing', ps.Backtrace(elt))
if href[0] != '#':
return ps.ResolveHREF(href, self)
elt = ps.FindLocalHREF(href, elt)
self.checktype(elt, ps)
if self.nilled(elt, ps):
return Nilled
if len(_children(elt)) == 0:
return []
v = self.simple_value(elt, ps)
return self.text_to_data(v, elt, ps)
def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
if pyobj is not None and type(pyobj) not in _seqtypes:
raise EvaluateException, 'expecting a list or None'
objid = _get_idstr(pyobj)
(ns, n) = self.get_name(name, objid)
el = elt.createAppendElement(ns, n)
if self.nillable is True and pyobj is None:
self.serialize_as_nil(el)
return None
tc = self.itemTypeCode
s = StringIO()
sep = ' '
for item in pyobj:
s.write(tc.get_formatted_content(item))
s.write(sep)
el.createAppendTextNode(s.getvalue())
def RegisterType(C, clobber = 0, *args, **keywords):
instance = apply(C, args, keywords)
for t in C.__dict__.get('parselist', []):
prev = Any.parsemap.get(t)
if prev:
if prev.__class__ == C:
continue
if not clobber:
raise TypeError(str(C) + ' duplicating parse registration for ' + str(t))
Any.parsemap[t] = instance
for t in C.__dict__.get('seriallist', []):
ti = type(t)
if ti in [
types.TypeType,
types.ClassType]:
key = t
elif ti in _stringtypes:
key = (types.ClassType, t)
else:
raise TypeError(str(t) + ' is not a class name')
prev = Any.serialmap.get(key)
if prev:
if prev.__class__ == C:
continue
if not clobber:
raise TypeError(str(C) + ' duplicating serial registration for ' + str(t))
Any.serialmap[key] = instance
from TCnumbers import *
from TCtimes import *
from schema import GTD, GED, WrapImmutable
from TCcompound import *
from TCapache import *
(_get_type_definition, _get_global_element_declaration, Wrap) = (GTD, GED, WrapImmutable)
f = lambda x: if type(x) == types.ClassType and issubclass(x, TypeCode):
passgetattr(x, 'type', None) is not None
TYPES = filter(f, map((lambda y: eval(y)), dir()))
if __name__ == '__main__':
print _copyright